自律给我自由

积累 自驱 自制 坚持

0%

Flink - 有状态的实时计算

本文主要从使用层面介绍Flink独树一帜的设计, 帮助大家理解并更好的利用这些特性

Apache Flink的定义

Apache Flink是在无边界和有边界的数据流上进行有状态计算的框架.

同行对比

面对已经在实时领域耕耘了许久的两位老前辈: Storm和Spark Streaming, Flink有什么优势能够脱颖而出呢?

  • Spark Streaming 攒一段数据再计算, 本质还是批处理, 涉及到shuffle还是会有落盘, fetch和merge等操作. API用起来很流畅, 但是不适合维护State. Spark Streaming更适合做etl.
  • Storm Flink API跟Spark Streaming很像,但编程模型跟Storm更为接近。Storm自身不支持State,需要用户自己来维护,为了高可用和故障恢复, 用户通常会选择Redis做缓存,不过也由此也增加了对外部系统的依赖,同时也带来了额外的开销。

由此可见, Storm和Spark Streaming在State的处理上都有些力不从心, Flink与之相比最大的优势就是提供了完善的State的支持.

State

每个重要的流应用程序都是有状态的, 只有少数仅对事件做转换, 并且事件彼此独立的应用程序不需要状态. 任何运行基本业务逻辑的程序都需要记住事件或中间结果, 以便在后续的处理中访问它们.

状态

从Flink在state处理上下文中提供的所有特性可以看到, state是Flink里的一等公民.

  • 多个state原型 flink为不同的数据结构提供了state原型, 比如原子value、list或map. 开发者可以结合业务逻辑选择合适的state原型.
  • 可插拔的state Backend 不同的后端state存放的位置不一样, 比如你可以配置RocksDBStateBackend, flink会把state保存在RocksDB中. RocksDB是一个高效的嵌入式磁盘KV数据库, 类似于HBASE
  • Exactly-once一致性状态保证 flink的检查点与故障恢复机制保证了应用程序的state在崩溃时的一致性, 因此失败是透明的, 不会影响应用的准确性.
  • 支持超大state flink基于异步和增量的算法, 可以支持TB级别的state
  • 应用可拓展 flink为应用程序的拓展提供了强大的兼容性支持, 不论业务逻辑修改还是并行度变更, 你可以从容的应对业务的迭代

关键特性

event-time与watermark

flink支持3种时间模型:

  • Processing time 指数据进入到操作符的系统时间. 是最简单的时间概念, 不需要在流与机器之间协调. 它可以提供最好的性能和最低的延迟. 由于processing time是运行时指定的, 因此程序在这种时间模型下, 每次执行的结果都不一样.
  • Event time 是记录实际产生的时间. 在进入flink之前就已经内嵌在记录当中. 在event-time, 程序处理的进度有数据来决定, 跟机器时钟没有关系. event-time程序必须定义如何产生水位线watermark, 水位线可以描述程序的进度. event-time可以反映客观事实, 基于事件时间的程序的计算结果是最准确的, 并且每次执行结果都不变.
  • Ingestion time 数据进入flink的时间, source操作符用其当前时间作为每个记录的时间, 基于事件的操作符(比如window)都会引用这个时间.

times_clocks

watermark

watermark是flink用来评估程序进度的机制. 在实际环境中数据不可避免的会产生乱序, watermark就是用来告诉flink低于它的记录都已经处理完毕了.

watermark作为stream的一部分随着记录在stream中流动, 它的本质就是一个时间戳. 在乱序的流中, watermark意味着早于它的记录都已经到达flink. 如下图

times_clocks

有的操作符消费多条输入流, 例如union, join, 或者跟在keyBy()partition()函数后面的操作符, 这些操作符的当前事件时间就是它的输入流中最小的事件时间.

times_clocks

如何理解watermark呢

假设一个场景, flink收到的数据绝大多数都是顺序的, 偶尔有几个乱序, 但最多不会超过1min. 即00:00产生的记录最迟01:00可以被flink消费到.

反过来讲, 当前时刻01:00, flink可以拿到最早00:00产生的记录; 02:00拿到最早01:00产生的记录…以此类推. 那么相对于当前时间T, 水位线就是T-1. 即当前时间减去1分钟, 在这之前的记录都已处理完毕, 不会再有更早的记录出现.

这是建立在消费紧跟着当前时间来模拟的. 对应到event-time, “当前时间”就可以认为是当前接收到记录的最大事件时间, 上面的公式就变成: max(T) - 1

如何生成watermark呢

接上述例子, 数据有1分钟的乱序

1
2
3
4
5
6
7
8
9
env.addSource(...)
.flatMap(new EventExtraction())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.minutes(1)) {

@Override
public long extractTimestamp(Event element) {
return element.getEventTime();
}
});

window

窗口是处理无界流的心脏. 窗口将stream切分成了有宽度的”Buckets”, 在这之上我们可以进行计算.

1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

篇幅原因, 我们主要讨论keyed window, 以及其中keyBy, window, trigger和function api

window的生命周期

以[12:00, 12:05)的窗口为例, 属于这个窗口的第一个元素到达window操作符时, 这个窗口被创建. 当水位线越过12:06(水位线始终比当前最大事件时间小1分钟)时, 窗口被完全删除

window function

flink有两类window function

  • 可累加的函数 reduce, aggregate, fold就属于这类函数. 它们每收到一条记录就进行计算, 记录不在内存中停留, 只会保留聚合结果. 这种函数由于是增量计算, 且不会存储大量数据, 所以效率很高. 但是能够处理的场景有限, 比如uv没法处理, 另外这类函数拿不到窗口的上下文来访问时间与state信息.
1
2
3
4
5
6
7
8
9
10
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
  • ProcessWindowFunction 可以拿到包含了窗口所有记录的Iterable对象, 和一个包含了Time与State信息的Context对象, 这使得它提供了比其它函数更具灵活性. 不过这是以牺牲了性能与资源消耗为代价, 因为它无法增量计算, window的元素都缓存在内存中.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}

触发器

触发器决定了窗口函数何时执行, flink用4个常量来描述触发时的行为:

  • CONTINUE: 什么都不做
  • FIRE: 触发计算
  • PURGE: 清空窗口里的元素
  • FIRE_AND_PURGE: 触发计算并在之后清空窗口里的元素

默认情况下窗口会在watermark越过窗口的结束时间时调用。比如5min宽度的窗口[12:00,12:05),1min乱序的watermark,会在12:06结束时触发计算。

基于processing-time的程序由于没有watermark,所以会在12:05结束时触发

用户可能不愿意等到6min之后才看到结果,最好是每30s计算一个结果。对于event-time的程序,flink在不破坏watermark机制的前提下用Pane巧妙的解决了这个问题。

基于processing-time的程序无法做到在窗口期内触发计算

Pane是flink根据触发间隔(本例中是30s)而在逻辑上划分的数据块(block),即一个window会划分成n个pane,n = window_size / pane_size。当水位线越过pane的end_tine时触发计算,计算的数据集就是pane里的数据。

注意

如果窗口函数是ProccessFunction,则窗口里的数据默认会在窗口销毁时清空。换句话说pane会包含之前pane的数据,这部分数据如何处理呢?flink提供了触发器的包装类PurgingTrigger,它会在每次pane触发计算后清空窗口的数据。

1
2
3
4
5
6
7
source
.keyBy(Order::getCategoryCode)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.trigger(PurgingTrigger.of(ContinuousEventTimeTrigger.of(Time.seconds(30))))
.process(new TotalMetric(startFrom, false))
.name("transform-alluser-total")
.uid("transform-alluser-total");

checkpoint

Flink提供了一种容错机制,可以恢复应用程序的state到最近的一致性状态。该机制可确保即使出现故障,程序的状态最终也将以Exactly-Once的语义回溯数据流中的每条记录。这个机制就是Checkpoint.

当应用程序失败时, flink会停止数据流, 系统会重启程序并将操作符重置到最近一次成功的checkpoint. 重启的程序处理的所有记录都会保证不与前一次checkpoint里的数据重复.

flink分布式快照的核心是stream栅栏(Barriers). 这些Barrier从source开始注入到数据流中, 作为流的一部分随着记录一起发送到下游.

如图barriers将数据流分割成了若干部分, 每个barrier前面的记录进入到当前的快照, 之后的记录会进入下一次快照. Barriers不会打断数据流因此它非常轻量.

不同快照的barriers可以同时出现在数据流中, 这意味着多个快照可能会并发执行.

stream_barriers

Barrier从source开始注入到流中, 当中间的操作符收到上游发送过来的全部barrier, 它在完成state快照后会把barrier发送给它所有的下游stream. 一旦sink收到了它所有上游stream的barrier, 会向checkpoint协调器发送快照完毕的ack消息. 所有的sink发送ack后快照就认为执行成功了. 快照的成功标志着属于当前快照的所有记录(包含产生的所有子孙记录)都已经通过了完整的数据流拓扑.

stream_aligning

  • 一旦操作符接收到了上游某一个stream的barrier, 则在收到其它stream的barrier之前都不会再处理这个stream的数据. 否则它会把分属于两个快照的数据混合到一起
  • 在等待其它stream的barrier的同时, 操作符会将来自这个stream的数据缓存下来
  • 当收到最后一个barrier, 操作符在完成自身的输出后把自己的barrier也发送出去
  • 最后, 它开始恢复处理记录, 先处理缓存中的, 然后是来自stream的

注意

align buffer是flinkExactly-Once语义的保证, 只有在当前快照的记录都处理完了才会处理下一份快照的数据. 如果启用的是At-Least-Once语义, 则操作符会跳过align, 持续处理记录. 这样记录可能会在多份快照中重复.

flink默认开启Exactly-Once, 手动开启的方法

1
CheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

state

backend

flink提供了3个开箱即用的backend:

  • MemoryStateBackend 将数据以Objects的形式存放到Java Heap, Key/Value state和window操作符用Hash表来存放values, 触发器等等. checkpoint触发时, 会将state的快照作为ack的一部分发送到JobManager, JobManager将它们存放到自己的堆内存中. 适合在本地开发和调试使用.
  • FsStateBackend 需要配置一个文件系统的url, 比如”hdfs://namenode:8020/flink/checkpoints”或”file:///data/flink/checkpoints”. FsStateBackend同样是将state存放到内存中, checkpoint发生时将快照写到配置的文件系统中. 由于state存放在内存, 对其的访问都非常高效, 序列化只会在checkpoint时发生. 这个Backend的大小受限于JVM, 有OutOfMemorys的风险.
  • RocksDBStateBackend 也需要配置一个文件系统url, RocksDBStateBackend将state存放到RocksDB数据库中, RocksDB把数据文件存放到TaskManager的本地目录中. checkpoint触发时, RocksDBStateBackend将快照存放到配置的文件系统中. 这个Backend支持的state大小受限于可用的磁盘空间. RocksDBStateBackend是唯一一个支持增量checkpoint的Backend

    与FsStateBackend不同, RocksDBStateBackend每次访问state都要经过序列化/反序列化, 因此会有一定的性能损耗.

如何选择合适的backend呢

  • 如果数据量不大, 足以放入内存, 那么使用FsStateBackend可以获得更高的性能. 否则的话推荐使用RocksDBStateBackend

    这个数据量的边界可以参考现有实时看板的DAU, 处理最高150w的用户id绰绰有余.

State Time-To-Live (TTL)

flink提供了ttl机制, 所有的集合类型的state都支持记录级别的ttl, 这意味着list元素或map记录可以单独过期.

ttl在RocksDBStateBackend时非常有用. 因为手动删除(state.clear())只是标记删除, state的体积不会减少, 只有在下次访问时才会真正删除. 所以RocksDBStateBackend的state会越来越大, 对于长期驻留在state里的顽固记录只有在下次启动程序时删除(因为RocksDB只会加载访问的数据). 这在其它两个backend上不会发生

ttl的配置非常简单

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000)
.build();

ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

注意

flink目前(1.10版本)只支持ProcessingTime的过期, EventTime的ttl会在后续的版本里添加.

uid

任何程序不可避免的会产生迭代, 当业务逻辑改变时, flink如何让旧的state兼容新的程序呢? 这里要分两个部分来讨论

  • 应用程序的拓扑改变了

当应用程序从检查点重启时, flink将存储在保存点中的state与应用程序的有状态运算符进行匹配, 这个匹配就是基于操作符的id来实现的. 每个操作符都有一个默认ID,该ID根据操作符在程序拓扑中的位置来决定. 因此没有修改的程序总是可以从检查点里恢复.

然而默认的id可能会随着程序的改变而改变, 开发者因此为操作符需要明确分配一个id, 只要id不变程序就能正常重启. 分配id很简单:

1
val mappedEvents: DataStream[(Int, Long)] = events.map(new MyStatefulMapFunc()).uid("mapper-1")

由于operator ID存放在检查点中, 必须跟程序启动时的id相等, 因此建议给所有将来有可能会升级的操作符分配一个唯一的ID

  • 程序的并行度发生改变

对于Operator state, state不会固定属于某一个子任务, flink在程序重启时采用轮询的方式为子任务分配state

对于Keyed state, 每个Keyed state逻辑上绑定到一个唯一的<parallel-operator-instance, key>组合, 一个key只会属于一个唯一的并行实例. 这个组合又被组织成Key Groups, key group由若干连续的key组成, 是flink重新分发state的原子单位, key group的数量等于程序最大并行度. 当程序的并行度发生改变, 每个操作符的并行实例分配到的key group随之变化, 但key相对于最高并行度的映射关系没有变, 因此可以任意拓展, 这其实就是一致性hash的实现.

keyed state rescale